Real Time Predictive Analitics [Tweets Analysis]ΒΆ
Techniques used:
1. TF-IDF
2. NaiveBayes Classification
Use socket server to publish data from file to port 9000
# -*- coding: utf-8 -*-
import os
import sys
os.chdir("/home/cloudops/spark")
os.curdir
# Configure the environment. Set this up to the directory where
# Spark is installed
if 'SPARK_HOME' not in os.environ:
os.environ['SPARK_HOME'] = '/opt/spark'
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
# Add the following paths to the system path. Please check your installation
# to make sure that these zip files actually exist. The names might change
# as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.9-src.zip"))
# Initiate Spark context. Once this is done all other applications can run
from pyspark import SparkContext
from pyspark import SparkConf
# Optionally configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.setAppName("Spark-Analysis")
## Initialize SparkContext. Run only once.
# Otherwise you get multiple Context Error.
# For streaming, create a spark context with 2 threads.
sc = SparkContext('local[2]', conf=conf)
# <SparkContext master=local[2] appName=Spark-Analysis>
# =====================================
# Building and saving the Model
# =====================================
tweetData = sc.textFile("data/movietweets.csv")
tweetData.collect()
# ['positive,The Da Vinci Code book is just awesome.',
# 'positive,i liked the Da Vinci Code a lot.',
# 'positive,i liked the Da Vinci Code a lot.',
# . . .
tweetText = tweetData.map(lambda line: line.split(",")[1])
tweetText.collect()
# ['The Da Vinci Code book is just awesome.',
# 'i liked the Da Vinci Code a lot.',
# 'i liked the Da Vinci Code a lot.',
# . . .
# =====================================
# Use TF-IDF
# =====================================
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
hashingTF = HashingTF()
tf = hashingTF.transform(tweetText)
tf.cache()
# Transform to IDF
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)
tfidf.cache()
tfidf.count() # 100 MapPartitionsRDD - matrix
# Concatinate tfidf with original tweetData
xformedData = tweetData.zip(tfidf)
xformedData.cache()
xformedData.collect()
# sparce vectors for sentiments ("pos/neg")
# [('positive,The Da Vinci Code book is just awesome.',
# SparseVector(1048576, {105642: 0.2713, 151034: 0.3524, 173013: 0.9262, 173606: 0.2976,
# 186435: 0.7439, 211440: 1.0999, 238153: 0.1015,
# 244458: 3.0057, 263483: 0.3247, 265159: 0.2031,
# 296409: 0.1616, 335453: 0.8775, 469732: 1.1494,
# 702216: 0.1212, 702740: 0.0508, 734443: 0.472,
# 777769: 0.9516, 793623: 0.3111, 875351: 0.0,
# 891534: 0.0508, 897367: 0.6833, 968035: 0.0508})),
# . . .
# =====================================
from pyspark.mllib.regression import LabeledPoint
def convertToLabeledPoint(inVal) :
origAttr = inVal[0].split(",")
sentiment = 0.0 if origAttr[0] == "positive" else 1.0
return LabeledPoint(sentiment, inVal[1])
# LabelPoint
tweetLp = xformedData.map(convertToLabeledPoint)
tweetLp.cache()
tweetLp.collect()
# [LabeledPoint(0.0, (1048576,[105642,151034,173013,173606,186435,211440,238153,
# 244458,263483,265159,296409,335453,469732,702216,702740,734443,777769,793623,
# 875351,891534,897367,968035],
# . . .
# =====================================
# NaiveBayes Classification
# =====================================
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
model = NaiveBayes.train(tweetLp, 1.0)
predictionAndLabel = tweetLp.map(lambda p:
(float(model.predict(p.features)),
float(p.label)))
predictionAndLabel.collect()
# [(0.0, 0.0),
# (1.0, 0.0),
# (1.0, 0.0),
# . . .
# =====================================
# Convert to DF and form the Confusion Matrix
# =====================================
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
predDF = sqlContext.createDataFrame(predictionAndLabel.collect(),
["prediction", "label"])
predDF.groupBy("label", "prediction").count().show()
# The accuracy of the model
# +-----+----------+-----+
# |label|prediction|count|
# +-----+----------+-----+
# | 1.0| 1.0| 47|
# | 0.0| 1.0| 15| <-- small data set
# | 1.0| 0.0| 3|
# | 0.0| 0.0| 35|
# +-----+----------+-----+
# =====================================
# Save the Model
# model.save(sc,"TweetsSentimentModel")
# =====================================
import pickle
with open('tweetsSentiModel', 'wb') as f:
pickle.dump(model, f)
# =====================================
# Real Time
# Getting tweets in Real Time and making predictions
# =====================================
import pickle
from pyspark.mllib.classification import NaiveBayesModel
with open('tweetsSentiModel', 'rb') as f:
loadedModel = pickle.load(f)
# =====================================
# Read a Streaming context
# =====================================
from pyspark.streaming import StreamingContext
# 1 sec interval
streamContext = StreamingContext(sc, 1)
# stream the data from socket server (open socket on port 9000)
# run socket_server.py
tweets = streamContext.socketTextStream("localhost", 9000)
# =====================================
# Action - generate tweets and predict
# =====================================
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
# Broadcast var
bc_model = sc.broadcast(loadedModel)
def predictSentiment(tweetText):
''' Predict sentiment for every tweet
'''
nbModel = bc_model.value
# HashingTF transform
hashingTF = HashingTF()
tf = hashingTF.transform(tweetText)
tf.cache()
# IDF transform: Text -> TF-IDF Vector
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)
tfidf.cache()
# Use model for prediction
prediction = nbModel.predict(tfidf)
print("Predictions for this window:", end = '')
for i in range(0, prediction.count()):
print(prediction.collect()[i], tweetText.collect()[i])
# Every time RDD is created the micro-batch is over ->
# call predictSentiment function ->
# make prediction for tweets
tweets.foreachRDD(predictSentiment)
# Run streaming
streamContext.start()
# Socket server:
# Publishing: The Da Vinci Code sucked
# Publishing: The Da Vinci Code was awesome...
# Publishing: da vinci code is awesome.
# Publishing: oh so beautiful Da Vinci Code...
# . . .
# Predictions for this window: 0.0 Client on 127.0.0.1The Da Vinci Code sucked
# Predictions for this window: 0.0 The Da Vinci Code was awesome...
# Predictions for this window: 0.0 da vinci code is awesome.
# . . .
streamContext.stop()